2018年以降の記事はGitHub Pagesに移行しました

Fluentdのプラグインを作成してみる(練習用)

前回までのあらすじ

Fluentdというログ収集ツールを使ってApacheのログを取得するまで - kk_Atakaの日記 で、confファイルのコメントアウトを外してとりあえず動くっていうところまではいけた。今度は簡単なプラグインを作ってみる。

ひたすら何かを吐き続けるプラグイン

Inputプラグインの場合

Fluent::Inputクラスを継承する。

class SimpleInput < Fluent::Input
  # 第一引数がプラグインの名前、<source> typeに指定される
  Fluent::Plugin.register_input("simple_in", self)

  def start
    @thread = Thread.new(&method(:run))
  end

  def run
    loop do
      # emitメソッドの第一引数が <match **.**> の**に該当すればその形式で出力される
      # fluent.confでdebug.**はstdoutで出力すると定義されている
      Fluent::Engine.emit("debug.debug", Fluent::Engine.now, {"simple" => "debudebu"})
      # fluent.confに<match simple.output> type simple_outを定義したので、以下のOutput形式で出力される
      Fluent::Engine.emit("simple.output", Fluent::Engine.now, {"simple" => "simout"})
      sleep(1)
    end
  end
end

上のソースには書かれていないけど、他にもメソッドがいくつか。(公式サイトより)

  • configure メソッド
    • スタート前に呼び出される
    • confハッシュにパラメータを入れられる?
    • エラーはFluent::ConfigErrorを投げる
  • start メソッド
    • スタート時に呼ばれる
    • ここでスレッドを作ったりファイルをオープンしたり
  • shutdown メソッド
    • シャットダウン時に呼ばれる
    • スレッドやファイルのクローズはここで。
  • イベントをsubmitするためにはFluent::Engine.emit(tag, time, record)メソッドを使う
    • tagはString、timeはUnixTime、recordはハッシュ。
Outputプラグインの場合

まず種類がいろいろ。

  • Buffered output plugins Fluent::BufferedOutputクラスを継承
  • Time sliced output plugins Fluent::TimeSlicedOutputクラスを継承/Buffered output pluginを継承したプラグイン?
  • Non-buffered output plugins Fluent::Outputクラスを継承

今回はFluent::Outputを使った。

class SimpleOutput < Fluent::Output
  # 第一引数がプラグインの名前、<match> の typeに指定される
  Fluent::Plugin.register_output("simple_out", self)

  def emit(tag, es, chain)
    chain.next
    es.each do |time, record|
      # 出力内容
      puts "simple_out: #{time} - #{record}"
    end
  end
end
  • configure, start, shutdown はInputプラグイン同様にある
  • emit メソッドはイベントに到達した場合呼ばれる
    • esはFluent::EventStreamオブジェクト(イベントが入ってる?)
      • eachで回すとイベントを検索できる。
    • chainはトランザクションメッセージのオブジェクト。
      • 適切なポイントでchain.nextを呼ぶとエラーを吐いたときにrollbackしてくれる。
プラグインをデバッグしたい場合
  • fluentdコマンド実行するときに-vvオプションを指定する事でデバッグメッセージが表示できる。
  • デバッグにはstdoutかcopyが役に立つ
    • stdoutはマッチしたイベントをコンソールに出力する
    • copyはマッチしたイベントを複数のプラグインにコピーできる
      • stdoutと一緒に使う事ができる

Configファイル

# Inputプラグイン指定
<source>
  type simple_in
</source>

# Fluent::Engine.emit("simple.output", Fluent::Engine.now, {"simple" => "simout"})
# simple.outputタグ指定した方はsimple_output(Outputプラグイン)出力にマッチする
<match simple.output>
  type simple_out
</match>

#
## match tag=debug.** and dump to console
# Fluent::Engine.emit("debug.debug", Fluent::Engine.now, {"simple" => "debudebu"})  
# debug.debugタグ指定した方はstdout出力にマッチする
<match debug.**>
  type stdout
</match>

これを実行すると…。

$ bundle exec fluentd -c /home/kk_Ataka/fluentd/fluent.conf -p /home/kk_Ataka/fluentd/plugin/
2012-08-17 22:19:41 +0900: starting fluentd-0.10.24
2012-08-17 22:19:41 +0900: reading config file path="/home/kk_Ataka/fluentd/fluent.conf"
(略)
2012-08-17 22:19:41 +0900: following tail of /usr/local/apache2/logs/access_log
2012-08-17 22:19:41 +0900: following tail of /usr/local/apache2/logs/error_log
2012-08-17 22:19:41 +0900 debug.debug: {"simple":"debudebu"}
simple_out: 1345209581 - {"simple"=>"simout"}
2012-08-17 22:19:42 +0900 debug.debug: {"simple":"debudebu"}
simple_out: 1345209582 - {"simple"=>"simout"}
2012-08-17 22:19:43 +0900 debug.debug: {"simple":"debudebu"}
simple_out: 1345209583 - {"simple"=>"simout"}
2012-08-17 22:19:44 +0900 debug.debug: {"simple":"debudebu"}
simple_out: 1345209584 - {"simple"=>"simout"}
2012-08-17 22:19:45 +0900 debug.debug: {"simple":"debudebu"}
simple_out: 1345209585 - {"simple"=>"simout"}
2012-08-17 22:19:46 +0900 debug.debug: {"simple":"debudebu"}
simple_out: 1345209586 - {"simple"=>"simout"}

1秒刻みでプラグインに記述した内容が出力される。こっちが標準のstdout出力。

2012-08-17 22:19:46 +0900 debug.debug: {"simple":"debudebu"}

こっちが自分で記述したOutputプラグイン書式の出力。

simple_out: 1345209586 - {"simple"=>"simout"}